// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Declaration of PacketThinnerCalculator.

#include <cmath> // for ceil
#include <cstdint>
#include <memory>

#include "absl/log/absl_check.h"
#include "mediapipe/calculators/core/packet_thinner_calculator.pb.h"
#include "mediapipe/framework/calculator_context.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/video_stream_header.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/tool/options_util.h"

namespace mediapipe
{

  namespace
  {
    const double kTimebaseUs = 1000000; // Microseconds.
    const char *const kOptionsTag = "OPTIONS";
    const char *const kPeriodTag = "PERIOD";
  } // namespace

  // This calculator is used to thin an input stream of Packets.
  // An example application would be to sample decoded frames of video
  // at a coarser temporal resolution. Unless otherwise stated, all
  // timestamps are in units of microseconds.
  //
  // Thinning can be accomplished in one of two ways:
  // 1) asynchronous thinning (known below as async):
  //    Algorithm does not rely on a master clock and is parameterized only
  //    by a single option -- the period.  Once a packet is emitted, the
  //    thinner will discard subsequent packets for the duration of the period
  //    [Analogous to a refractory period during which packet emission is
  //    suppressed.]
  //    Packets arriving before start_time are discarded, as are packets
  //    arriving at or after end_time.
  // 2) synchronous thinning (known below as sync):
  //    There are two variants of this algorithm, both parameterized by a
  //    start_time and a period.  As in (1), packets arriving before start_time
  //    or at/after end_time are discarded.  Otherwise, at most one packet is
  //    emitted during a period, centered at timestamps generated by the
  //    expression:
  //      start_time + i * period  [where i is a non-negative integer]
  //    During each period, the packet closest to the generated timestamp is
  //    emitted (latest in the case of ties).  In the first variant
  //    (sync_output_timestamps = true), the emitted packet is output at the
  //    generated timestamp.  In the second variant, the packet is output at
  //    its original timestamp.  Both variants emit exactly the same packets,
  //    but at different timestamps.
  //
  // Thinning period can be provided in the calculator options or via a
  // side packet with the tag "PERIOD".
  //
  // Calculator options provided optionally with the "OPTIONS" input
  // sidepacket tag will be merged with this calculator's node options, i.e.,
  // singular fields of the side packet will overwrite the options defined in the
  // node, and repeated fields will concatenate.
  //
  // Example config:
  // node {
  //   calculator: "PacketThinnerCalculator"
  //   input_side_packet: "OPTIONS:calculator_options"
  //   input_stream: "signal"
  //   output_stream: "output"
  //   options {
  //     [mediapipe.PacketThinnerCalculatorOptions.ext] {
  //       thinner_type: SYNC
  //       period: 10
  //       sync_output_timestamps: true
  //       update_frame_rate: false
  //     }
  //   }
  // }
  class PacketThinnerCalculator : public CalculatorBase
  {
  public:
    PacketThinnerCalculator() {}
    ~PacketThinnerCalculator() override {}

    static absl::Status GetContract(CalculatorContract *cc)
    {
      if (cc->InputSidePackets().HasTag(kOptionsTag))
      {
        cc->InputSidePackets().Tag(kOptionsTag).Set<CalculatorOptions>();
      }
      cc->Inputs().Index(0).SetAny();
      cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));
      if (cc->InputSidePackets().HasTag(kPeriodTag))
      {
        cc->InputSidePackets().Tag(kPeriodTag).Set<int64_t>();
      }
      return absl::OkStatus();
    }

    absl::Status Open(CalculatorContext *cc) override;
    absl::Status Close(CalculatorContext *cc) override;
    absl::Status Process(CalculatorContext *cc) override
    {
      //ABSL_LOG(INFO) << "Process PacketThinnerCalculator";

      if (cc->InputTimestamp() < start_time_)
      {
        return absl::OkStatus(); // Drop packets before start_time_.
      }
      else if (cc->InputTimestamp() >= end_time_)
      {
        if (!cc->Outputs().Index(0).IsClosed())
        {
          cc->Outputs()
              .Index(0)
              .Close(); // No more Packets will be output after end_time_.
        }
        return absl::OkStatus();
      }
      else
      {
        return thinner_type_ == PacketThinnerCalculatorOptions::ASYNC
                   ? AsyncThinnerProcess(cc)
                   : SyncThinnerProcess(cc);
      }
    }

  private:
    // Implementation of ASYNC and SYNC versions of thinner algorithm.
    absl::Status AsyncThinnerProcess(CalculatorContext *cc);
    absl::Status SyncThinnerProcess(CalculatorContext *cc);

    // Cached option.
    PacketThinnerCalculatorOptions::ThinnerType thinner_type_;

    // Given a Timestamp, finds the closest sync Timestamp
    // based on start_time_ and period_.  This can be earlier or
    // later than given Timestamp, but is guaranteed to be within
    // half a period_.
    Timestamp NearestSyncTimestamp(Timestamp now) const;

    // Cached option used by both async and sync thinners.
    TimestampDiff period_; // Interval during which only one packet is emitted.
    Timestamp start_time_; // Cached option - default Timestamp::Min()
    Timestamp end_time_;   // Cached option - default Timestamp::Max()

    // Only used by async thinner:
    Timestamp next_valid_timestamp_; // Suppress packets until this timestamp.

    // Only used by sync thinner:
    Packet saved_packet_;         // Best packet not yet emitted.
    bool sync_output_timestamps_; // Cached option.
  };
  REGISTER_CALCULATOR(PacketThinnerCalculator);

  namespace
  {
    TimestampDiff abs(TimestampDiff t) { return t < 0 ? -t : t; }
  } // namespace

  absl::Status PacketThinnerCalculator::Open(CalculatorContext *cc)
  {
    PacketThinnerCalculatorOptions options = mediapipe::tool::RetrieveOptions(
        cc->Options<PacketThinnerCalculatorOptions>(), cc->InputSidePackets(),
        kOptionsTag);

    thinner_type_ = options.thinner_type();
    // This check enables us to assume only two thinner types exist in Process()
    ABSL_CHECK(thinner_type_ == PacketThinnerCalculatorOptions::ASYNC ||
               thinner_type_ == PacketThinnerCalculatorOptions::SYNC)
        << "Unsupported thinner type.";

    if (thinner_type_ == PacketThinnerCalculatorOptions::ASYNC)
    {
      // ASYNC thinner outputs packets with the same timestamp as their input so
      // its safe to SetOffset(0). SYNC thinner manipulates timestamps of its
      // output so we don't do this for that case.
      cc->SetOffset(0);
    }

    if (cc->InputSidePackets().HasTag(kPeriodTag))
    {
      period_ =
          TimestampDiff(cc->InputSidePackets().Tag(kPeriodTag).Get<int64_t>());
    }
    else
    {
      period_ = TimestampDiff(options.period());
    }
    ABSL_CHECK_LT(TimestampDiff(0), period_)
        << "Specified period must be positive.";

    if (options.has_start_time())
    {
      start_time_ = Timestamp(options.start_time());
    }
    else if (thinner_type_ == PacketThinnerCalculatorOptions::ASYNC)
    {
      start_time_ = Timestamp::Min();
    }
    else
    {
      start_time_ = Timestamp(0);
    }

    end_time_ =
        options.has_end_time() ? Timestamp(options.end_time()) : Timestamp::Max();
    ABSL_CHECK_LT(start_time_, end_time_)
        << "Invalid PacketThinner: start_time must be earlier than end_time";

    sync_output_timestamps_ = options.sync_output_timestamps();

    next_valid_timestamp_ = start_time_;
    // Drop packets until this time.
    cc->Outputs().Index(0).SetNextTimestampBound(start_time_);

    if (!cc->Inputs().Index(0).Header().IsEmpty())
    {
      if (options.update_frame_rate())
      {
        const VideoHeader &video_header =
            cc->Inputs().Index(0).Header().Get<VideoHeader>();
        double new_frame_rate;
        if (thinner_type_ == PacketThinnerCalculatorOptions::ASYNC)
        {
          new_frame_rate =
              video_header.frame_rate /
              ceil(video_header.frame_rate * options.period() / kTimebaseUs);
        }
        else
        {
          const double sampling_rate = kTimebaseUs / options.period();
          new_frame_rate = video_header.frame_rate < sampling_rate
                               ? video_header.frame_rate
                               : sampling_rate;
        }
        std::unique_ptr<VideoHeader> header(new VideoHeader);
        header->format = video_header.format;
        header->width = video_header.width;
        header->height = video_header.height;
        header->duration = video_header.duration;
        header->frame_rate = new_frame_rate;
        cc->Outputs().Index(0).SetHeader(Adopt(header.release()));
      }
      else
      {
        cc->Outputs().Index(0).SetHeader(cc->Inputs().Index(0).Header());
      }
    }

    return absl::OkStatus();
  }

  absl::Status PacketThinnerCalculator::Close(CalculatorContext *cc)
  {
    // Emit any saved packets before quitting.
    if (!saved_packet_.IsEmpty())
    {
      // Only sync thinner should have saved packets.
      ABSL_CHECK_EQ(PacketThinnerCalculatorOptions::SYNC, thinner_type_);
      if (sync_output_timestamps_)
      {
        cc->Outputs().Index(0).AddPacket(
            saved_packet_.At(NearestSyncTimestamp(saved_packet_.Timestamp())));
      }
      else
      {
        cc->Outputs().Index(0).AddPacket(saved_packet_);
      }
    }
    return absl::OkStatus();
  }

  absl::Status PacketThinnerCalculator::AsyncThinnerProcess(
      CalculatorContext *cc)
  {
    if (cc->InputTimestamp() >= next_valid_timestamp_)
    {
      cc->Outputs().Index(0).AddPacket(
          cc->Inputs().Index(0).Value()); // Emit current packet.
      next_valid_timestamp_ = cc->InputTimestamp() + period_;
      // Guaranteed not to emit packets seen during refractory period.
      cc->Outputs().Index(0).SetNextTimestampBound(next_valid_timestamp_);
    }
    return absl::OkStatus();
  }

  absl::Status PacketThinnerCalculator::SyncThinnerProcess(
      CalculatorContext *cc)
  {
    if (saved_packet_.IsEmpty())
    {
      // If no packet has been saved, store the current packet.
      saved_packet_ = cc->Inputs().Index(0).Value();
      cc->Outputs().Index(0).SetNextTimestampBound(
          sync_output_timestamps_ ? NearestSyncTimestamp(cc->InputTimestamp())
                                  : cc->InputTimestamp());
    }
    else
    {
      // Saved packet exists -- update or emit.
      const Timestamp saved = saved_packet_.Timestamp();
      const Timestamp saved_sync = NearestSyncTimestamp(saved);
      const Timestamp now = cc->InputTimestamp();
      const Timestamp now_sync = NearestSyncTimestamp(now);
      ABSL_CHECK_LE(saved_sync, now_sync);
      if (saved_sync == now_sync)
      {
        // Saved Packet is in same interval as current packet.
        // Replace saved packet with current if it is at least as
        // central as the saved packet wrt temporal interval.
        // [We break ties in favor of fresher packets]
        if (abs(now - now_sync) <= abs(saved - saved_sync))
        {
          saved_packet_ = cc->Inputs().Index(0).Value();
        }
      }
      else
      {
        // Saved packet is the best packet from earlier interval: emit!
        if (sync_output_timestamps_)
        {
          cc->Outputs().Index(0).AddPacket(saved_packet_.At(saved_sync));
          cc->Outputs().Index(0).SetNextTimestampBound(now_sync);
        }
        else
        {
          cc->Outputs().Index(0).AddPacket(saved_packet_);
          cc->Outputs().Index(0).SetNextTimestampBound(now);
        }
        // Current packet is the first one we've seen from new interval -- save!
        saved_packet_ = cc->Inputs().Index(0).Value();
      }
    }
    return absl::OkStatus();
  }

  Timestamp PacketThinnerCalculator::NearestSyncTimestamp(Timestamp now) const
  {
    ABSL_CHECK_NE(start_time_, Timestamp::Unset())
        << "Method only valid for sync thinner calculator.";

    // Computation is done using int64 arithmetic.  No easy way to avoid
    // since Timestamps don't support div and multiply.
    const int64_t now64 = now.Value();
    const int64_t start64 = start_time_.Value();
    const int64_t period64 = period_.Value();
    ABSL_CHECK_LE(0, period64);

    // Round now64 to its closest interval (units of period64).
    int64_t sync64 =
        (now64 - start64 + period64 / 2) / period64 * period64 + start64;
    ABSL_CHECK_LE(abs(now64 - sync64), period64 / 2)
        << "start64: " << start64 << "; now64: " << now64
        << "; sync64: " << sync64;

    return Timestamp(sync64);
  }

} // namespace mediapipe
